package com.ac.driver.mqtt.config;

import com.ac.driver.mqtt.util.MqttUtils;
import com.xj.common.sdk.bean.mqtt.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @ClassName : MqttConfiguration
 * @Description : Mqtt Bean类
 * @Author : Alan
 * @Date: 2020-09-08 11:45
 */
@Slf4j
@Configuration
public class MqttConfiguration implements ApplicationContextAware, BeanPostProcessor {

    private ConfigurableApplicationContext applicationContext;
    @Resource
    private MqttProperties mqttProperties;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        mqttProperties.getConfig().forEach((channelName, config) -> init(channelName, config));
    }

    /**
     * 初始化
     */
    private void init(String channelName, MqttProperties.Config config) {
        DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
        // 通道信息
        beanFactory.registerBeanDefinition(channelName, mqttChannel());
        log.info("初始化mqtt, channel {}, 配置 {} ", channelName, config);

        MessageChannel mqttChannel = beanFactory.getBean(channelName, MessageChannel.class);
        beanFactory.registerBeanDefinition(channelName + "-mqttChannelAdapter", channelAdapter(config, mqttChannel));
        log.info("初始化mqtt Channel Adapter");

        String handlerBeanName = channelName + MqttUtils.CHANNEL_NAME_SUFFIX;
        beanFactory.registerBeanDefinition(handlerBeanName, mqttOutbound(config));
        log.info("初始化mqtt mqttChannelAdapter");

        MqttUtils.put(channelName, beanFactory.getBean(handlerBeanName, MqttPahoMessageHandler.class));
    }

    /**
     * mqtt工厂
     *
     * @param config
     * @return
     */
    private MqttPahoClientFactory mqttClientFactory(MqttProperties.Config config, boolean isConsumer) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();

        options.setServerURIs(config.getUrl());
        options.setCleanSession(true);
        options.setKeepAliveInterval(config.getKepAliveInterval());
        options.setPassword(config.getPassword().toCharArray());
        options.setUserName(config.getUsername());
        options.setConnectionTimeout(config.getTimeout());

        MqttProperties.Will will = null;
        if (isConsumer && config.getConsumerWill() != null) {
            will = config.getConsumerWill();
        } else if (!isConsumer && config.getProducerWill() != null) {
            will = config.getProducerWill();
        }
        if (will != null) {
            try {
                options.setWill(will.getTopic(), will.getPayload().getBytes("utf-8"), will.getQos(),
                        will.getRetained());
            } catch (UnsupportedEncodingException e) {
                log.error(e.getMessage(), e);
            }
        }

        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * 初始化通道
     *
     * @return
     */
    private AbstractBeanDefinition mqttChannel() {
        BeanDefinitionBuilder messageChannelBuilder = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class);
        messageChannelBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
        return messageChannelBuilder.getBeanDefinition();
    }

    /**
     * mqtt消息驱动转换器
     *
     * @param config
     * @param mqttChannel
     * @return
     */
    private AbstractBeanDefinition channelAdapter(MqttProperties.Config config, MessageChannel mqttChannel) {
        BeanDefinitionBuilder messageProducerBuilder = BeanDefinitionBuilder
                .genericBeanDefinition(MqttPahoMessageDrivenChannelAdapter.class);
        messageProducerBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
        messageProducerBuilder
                .addConstructorArgValue(config.getConsumerClientId());
        messageProducerBuilder.addConstructorArgValue(mqttClientFactory(config, true));
        messageProducerBuilder.addConstructorArgValue(config.getTopics());
        messageProducerBuilder.addPropertyValue("converter", new DefaultPahoMessageConverter());
        messageProducerBuilder.addPropertyValue("qos", config.getQos());
        messageProducerBuilder.addPropertyValue("outputChannel", mqttChannel);

        return messageProducerBuilder.getBeanDefinition();
    }

    /**
     * 消息发送客户端
     *
     * @param config
     * @return
     */
    private AbstractBeanDefinition mqttOutbound(MqttProperties.Config config) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MqttPahoMessageHandler.class);
        builder.addConstructorArgValue(config.getProducerClientId());
        builder.addConstructorArgValue(mqttClientFactory(config, false));
        builder.addPropertyValue("async", config.getAsync());

        return builder.getBeanDefinition();
    }

}
